package custom;

import beans.SensorReading;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

import javax.annotation.Nullable;

/**
 * 自定义一个间断式的时间戳抽取
 *
 * @author lvbingbing
 * @date 2022-01-03 19:57
 */
public class MyPunctuatedAssigner implements AssignerWithPunctuatedWatermarks<SensorReading> {

    private Long bound = 60 * 1000L;

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(SensorReading lastElement, long extractedTimestamp) {
        // 只给 sensor_1 的传感器的数据流插入 watermark
        if (lastElement.getId().equals("sensor_1")) {
            return new Watermark(extractedTimestamp - bound);
        }
        return null;
    }

    @Override
    public long extractTimestamp(SensorReading element, long previousElementTimestamp) {
        return element.getTimestamp();
    }
}
